package com.easy.mq.listener.adapter;

import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;

import org.apache.commons.lang.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.easy.mq.client.RocketClient;
import com.easy.mq.config.consumer.AbstractConsumerConfig;
import com.easy.mq.config.consumer.RocketConsumerConfig;
import com.easy.mq.entry.JsonUtil;
import com.easy.mq.enums.ConsumeResultStatus;
import com.easy.mq.enums.OrderMode;
import com.easy.mq.event.MQEvent;
import com.easy.mq.listener.AbstractMessageListenerContainer;
import com.easy.mq.listener.MQMessageListenerConcurrently;
import com.easy.mq.listener.MQMessageListenerOrder;
import com.easy.mq.listener.ManagerListener;
import com.easy.mq.transition.SerializationFactory;

public class RocketListenerAdapter<T> extends AbstractMessageListenerContainer{
	
	 private static final Logger logger = LoggerFactory.getLogger(RocketListenerAdapter.class);
	
	 private ManagerListener<T> managerListener;
	 
	 private RocketConsumerConfig consumerConfig;
	
	 private Thread thread = null;
	 
	 private DefaultMQPushConsumer mqPushConsumer;
	
	public RocketListenerAdapter(ManagerListener<T> managerListener) {
		this.managerListener = managerListener;
	}


	@Override
	public void registerMessageListener(AbstractConsumerConfig config) {
		//registerConsumer(config);
		ListenerConsumer listenerConsumer = new ListenerConsumer(config);
		thread = new Thread(listenerConsumer);
		thread.setDaemon(true);
		thread.start();
	}
	
	private class ListenerConsumer implements Runnable{
		AbstractConsumerConfig config;
		
		private Timer timer = new Timer("ListenerConsumer-timer", true);
		
		public ListenerConsumer(AbstractConsumerConfig config) {
			this.config = config;
		}
		
		@Override
		public void run() {
			registerConsumer(config);
			timer.schedule(new DaemonTask(this), 30 * 1000, 30 * 1000);
		}
		
	}
	
	class DaemonTask extends TimerTask {

		ListenerConsumer consumer;

		DaemonTask(ListenerConsumer consumer) {
			this.consumer = consumer;
		}

		@Override
		public void run() {
			if(logger.isDebugEnabled()) {
				logger.debug("Detection of consumption:topic:{},group:{}.",consumer.config.getTopic(),consumer.config.getGroup());
			}
		}

	}


	private void registerConsumer(AbstractConsumerConfig config) {
		consumerConfig =(RocketConsumerConfig) config;
		mqPushConsumer = new DefaultMQPushConsumer(consumerConfig.getGroup());  
		mqPushConsumer.setNamesrvAddr(consumerConfig.getAddress());  
		mqPushConsumer.setInstanceName(consumerConfig.getInstanceName());  
        if(StringUtils.isEmpty(consumerConfig.getInstanceName())) {
        	 consumerConfig.setInstanceName(UUID.randomUUID().toString());
		}
         
     	try {
     		mqPushConsumer.subscribe(consumerConfig.getTopic(), "*");
		} catch (MQClientException e1) {
			logger.error(e1.getMessage());
		}
		if (OrderMode.ORDER == consumerConfig.getConsumeMode()) {
			mqPushConsumer.registerMessageListener(new MQMessageListenerOrder() {

				@Override
				public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
					try {
						for (MessageExt ext : msgs) {
							byte[] msg = new RocketClient().getMsg(consumerConfig, ext);
							if ("info".equalsIgnoreCase(consumerConfig.getLogLevel())) {
								logger.info("Consumer news is  : {}", JsonUtil.bean2Json(msg));
							}
							if(msg == null || msg.length <=0) {
								continue;
							}
							MQEvent<Object> message = new MQEvent<Object>();
							message.setContent(msg);
							ConsumeResultStatus status = managerListener.notifyListener(consumerConfig, message);
							if (ConsumeResultStatus.SUCCESS == status) {
								return ConsumeOrderlyStatus.SUCCESS;
							} else {
								return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
							}
						}
						return ConsumeOrderlyStatus.SUCCESS;
					} catch (Exception e) {
						return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
					}
				}
			});

		} else {
			
			mqPushConsumer.registerMessageListener(new MQMessageListenerConcurrently() {

				@Override
				public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
						ConsumeConcurrentlyContext context) {
					try {
						for (MessageExt ext : msgs) {
							byte[] msg = new RocketClient().getMsg(consumerConfig, ext);
							if ("info".equalsIgnoreCase(consumerConfig.getLogLevel())) {
								logger.info("Consumer news is : {}", msg);
							}
							if(msg == null || msg.length <=0) {
								continue;
							}
							MQEvent<Object> message = new MQEvent<Object>();
							if(StringUtils.isNotEmpty(consumerConfig.getParamType())) {
								try {
									if(!"java.lang.String".equals(consumerConfig.getParamType())){
										Class<?> cl = Class.forName(consumerConfig.getParamType());
										message.setContent(SerializationFactory.factory(consumerConfig.getTransfer()).deserializer(msg, cl));
									}else{
										message.setContent(new String(msg,"UTF-8"));	
									}
								}catch(Exception e) {
									logger.error(e.getMessage()+new String(String.valueOf(msg)),e);
								}
							}else {
								message.setContent(new String(msg,"UTF-8"));	
							}
							
							ConsumeResultStatus status = managerListener.notifyListener(consumerConfig, message);
							if (ConsumeResultStatus.SUCCESS == status) {
								return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
							} else {
								return ConsumeConcurrentlyStatus.RECONSUME_LATER;
							}
						}
						return ConsumeConcurrentlyStatus.RECONSUME_LATER;
					} catch (Exception e) {
						logger.error(e.getMessage(),e);
						return ConsumeConcurrentlyStatus.RECONSUME_LATER;
					}
				}
			});
			
			try {
				mqPushConsumer.start();
			} catch (MQClientException e) {
				logger.error(e.getMessage());
			}
		}
	}

}
